fix(ingest/snowflake): resolve external stage lineage via DataHub graph#17358
fix(ingest/snowflake): resolve external stage lineage via DataHub graph#17358alokr-dhub wants to merge 7 commits into
Conversation
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
Connector Tests ResultsConnector tests failed for commit To skip connector tests, add the Autogenerated by the connector-tests CI pipeline. |
|
Linear: ING-2599 |
…raph and external_stage_platform_instance fields Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
|
🔴 Meticulous spotted visual differences in 116 of 1330 screens tested: view and approve differences detected. Meticulous evaluated ~10 hours of user flows against your PR. Last updated for commit |
…tform_instance field The merge of the marketplace feature (#14304) into this branch dropped the closing ) for the external_stage_platform_instance Field() definition, producing a SyntaxError on import. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
|
Your PR has been assigned to @sgomezvillamor (sergio.gomez) for review (ING-2599). |
…add path resolver utility - Extract DataLakePathResolver to data_lake_common/: a reusable bucket-scoped path -> existing-URN graph-lookup utility with per-run caching. One graph call per bucket regardless of how many stage paths share it. - Catch InvalidUrnError alongside ValueError when parsing stage URLs so a malformed gcs:// (empty path) no longer crashes extraction. - Make ParsedStageUrl, StageLookupEntry, and DataLakeUrnLookup frozen dataclasses with Tuple[str, ...] URN lists; construct StageLookupEntry once with its final URNs. Bucket is derived from path via @Property so the invariant holds by construction. - Replace lazy _get_path_resolver/assert pattern with __post_init__ construction; raise RuntimeError instead of asserting so the guard stays loud under python -O. - Replace O(n^2) input_datasets dedup with dict.fromkeys (preserves order). - Add tests for stages_without_urn warning path, gcs empty-path parse, and the DataLakePathResolver bucket-boundary contract. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Problem
The Snowflake connector previously resolved external stage lineage by generating a single dataset URN directly from the raw stage URL (e.g.
s3://my-bucket/data/→ one S3 URN). This produces incorrect lineage because:platform_instance, breaking the lineage graph.Solution
Graph-based stage resolution (opt-in)
When
resolve_external_stage_lineage_via_graphis enabled, the connector looks up every existing dataset URN in DataHub whose path is rooted at the stage's URL path. A single stage can therefore fan out to multiple upstream datasets that the corresponding data lake source previously ingested.Changes
data_lake_common/path_urn_resolver.py—DataLakePathResolverbulk-fetches every dataset URN under a bucket once per(platform, platform_instance, bucket)and matches client-side. N stage paths sharing a bucket cost a single graph call, not N. Designed to be reusable across connectors (RedshiftUNLOAD, external tables, etc.) and intentionally report-free; transient failures are surfaced viaDataLakeUrnLookup.transient_errorfor callers to log and count.SnowflakeStagesExtractor— added_parse_external_stage_url,_compute_external_stage_urns, and_resolve_stage_via_graph. URL resolutions are cached per stage URL within a run; transient graph failures are not cached so a later stage with the same URL can retry.StageLookupEntry.dataset_urn: Optional[str]→dataset_urns: Tuple[str, ...]— stages can now resolve to multiple upstream datasets. The dataclass is frozen and constructed once with its final URNs.ParsedStageUrl— frozen dataclass;bucketis derived frompathvia a@propertyso the invariant holds by construction.snowflake_pipes.py— pipe extractor fans out every resolved stage URN intoinputDatasetsand deduplicates across stages usingdict.fromkeys(insertion-order preserving, O(n)).snowflake_config.py— two new fields:resolve_external_stage_lineage_via_graph(defaultFalse) andexternal_stage_platform_instance(defaultNone). Validators normalize blank platform instances toNoneand warn whenexternal_stage_platform_instanceis set without the resolve flag.snowflake_report.py— addedexternal_stage_lineage_resolvedandexternal_stage_lineage_unresolvedcounters.snowflake_v2.py— passesctx.graphtoSnowflakeStagesExtractor._compute_external_stage_urnsnow also catchesInvalidUrnErrorso a malformedgcs://(empty path is rejected by the URN constructor) no longer crashes extraction; the existing structured warning is emitted instead.datetimeobjects to usetzinfo=timezone.utcso containerized CI in non-UTC zones is deterministic.False-positive prevention
A
START_WITHURN filter fors3://my-bucket/folderwould incorrectly matchs3://my-bucket/folder_other. Thedataset_path_is_rooted_athelper (inpath_urn_resolver.py) rejects such false positives by requiring the character immediately after the prefix to be/(child path) or,(URN env separator, i.e. exact match). The same check also drops sibling buckets that the case-insensitiveSTART_WITHwildcard might over-match.Behaviour without the flag
When
resolve_external_stage_lineage_via_graphisFalse(default), the connector falls back to the existing path-based URN generation — no change in behaviour for existing users.Test plan
tests/unit/data_lake_common/test_path_urn_resolver.py— covers the bucket-scoped fetch contract (one fetch per bucket, separate buckets fetched separately, scope is the bucket not the full path), sibling-bucket exclusion, transient-error pass-through with no caching, and thedataset_path_is_rooted_atboundary check parametrized across child/exact/sibling/wrong-platform/malformed inputs.TestGraphResolvedExternalStageLineage— S3/GCS/ABS resolution, empty matches, disabled-flag fallback, graph errors (with and without cache poisoning), platform-instance forwarding, URL caching, unsupported schemes, empty paths (boths3://andgcs://), internal stages, and malformed URLs.TestExternalStageConfigValidators— blank/whitespace normalization, stripping, and the cross-field warning.TestSnowflakePipesExtractor— fan-out across multiple resolved URNs, cross-stage deduplication, and thestages_without_urnwarning path (resolved/empty mix and the all-empty case)../gradlew :metadata-ingestion:lint— ruff + mypy clean../gradlew :metadata-ingestion:testQuick— 546 Snowflake unit tests pass.🤖 Generated with Claude Code